package com.stars.distributed.schedule.core;

import com.stars.distributed.schedule.bean.BatchResult;
import com.stars.distributed.schedule.bean.DbScheduleSubTask;
import com.stars.distributed.schedule.bean.DbScheduleTask;
import com.stars.distributed.schedule.exception.DistributedScheduleMQException;
import com.stars.distributed.schedule.factory.DefaultThreadFactory;
import com.stars.distributed.schedule.mq.MQListener;
import com.stars.distributed.schedule.factory.MQListenerFactory;
import com.stars.distributed.schedule.record.AsynInsertTaskRecord;
import com.stars.distributed.schedule.constant.DistributedScheduleConstants;
import com.stars.distributed.schedule.service.DistributedScheduleService;
import com.stars.distributed.schedule.enums.*;
import com.stars.distributed.schedule.util.*;

import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 批量任务状态维护线程
 *
 * @author guoguifang
 */
class DistributedScheduler extends AbstractDistributedScheduleThread {

    /**
     * 服务器启动后是否第一次执行该线程任务
     */
    private volatile AtomicBoolean firstExecute = new AtomicBoolean(true);

    /**
     * 当服务器启动以及在配置页面创建主任务时需要初始化不匹配任务，其他时候不打开
     */
    private volatile AtomicBoolean needInitMismatchTask = new AtomicBoolean(true);

    /**
     * 是否优先处理可执行任务状态，服务器启动后第一次执行时默认先处理可执行任务状态的方法
     */
    private volatile AtomicBoolean prioritizeExecutableTaskStatus = new AtomicBoolean(true);

    /**
     * 默认线程空闲时间，单位：毫秒
     */
    private long defaultIdleTimeMillis;

    /**
     * 当前服务器MQ已启动监听器
     */
    private Map<String, MQListener> mqListenerMap = new ConcurrentHashMap<>(64);

    @Override
    protected void execute() {
        // 判断是否是第一次执行该任务，如果是则执行错误子任务状态修复(如果由于上次服务器的异常关闭可能会使任务状态出现错误的情况)
        if (firstExecute.compareAndSet(true, false)) {
            maintainOutOfWorkSubTask(true);
        }

        // 判断是否需要初始化不匹配任务(若不匹配则为新增任务，则初始化未启动MQ监听器的任务)，服务器初启动时必须初始化一次
        if (needInitMismatchTask.compareAndSet(true, false)) {
            maintainMismatchTask();
            maintainMQListener();
        }

        /*
         * 若优先处理可执行任务状态则先执行可执行任务的状态改变，然后判断是否有已完成或已失败任务，
         * 若有则再次执行可执行任务状态改变(因为可执行任务的依赖任务依赖于其依赖任务是否已完成或已失败)，
         * 为了防止其他线程执行可执行任务状态的修改在本系统无法获知所以在优先处理可执行任务状态的修改时通知任务抓取线程
         */
        if (prioritizeExecutableTaskStatus.compareAndSet(true, false)) {
            int executeCount = maintainExecutableTask();
            if (maintainFinishedTask() > 0) {
                executeCount = maintainExecutableTask();
            }
            if (executeCount == 0) {
                DistributedScheduleCatcher.getSingleInstance().signalIdle();
            }
        } else {
            maintainFinishedTask();
            maintainExecutableTask();
        }

        // 调度器闲置，若在处理之上两个方法时需要重新初始化任务则不等待
        if (!needInitMismatchTask.get()) {
            long idleTimeMillis = getIdleTimeMillis();
            if (idleTimeMillis >= 0 && idleTimeMillis <= defaultIdleTimeMillis) {
                prioritizeExecutableTaskStatus.set(true);
                awaitIdleMillis(idleTimeMillis);
            } else {
                awaitIdleMillis(defaultIdleTimeMillis);
            }
        }
    }

    /**
     * 描述：当服务器初启动时修复上一次服务器因突然关闭造成的失效子任务
     */
    public int maintainOutOfWorkSubTask(boolean firstExecute) {
        // 获取查询条件
        Map<String, Object> queryOutOfWorkSubTaskCondition = new HashMap<>(8);
        List<String> runStatusList = new ArrayList<>();
        runStatusList.add(TaskStatus.RUNNING.getCode());
        runStatusList.add(TaskStatus.RETRY_RUNNING.getCode());
        queryOutOfWorkSubTaskCondition.put("runStatusList", runStatusList);
        if (!firstExecute) {
            queryOutOfWorkSubTaskCondition.put("innerServer", "true");
            queryOutOfWorkSubTaskCondition.put("alive_no", YesOrNo.NO.getCode());
        } else {
            queryOutOfWorkSubTaskCondition.put("executeServerIp", getDistributedScheduleServerConfig().getIp());
            queryOutOfWorkSubTaskCondition.put("executeServerPort", getDistributedScheduleServerConfig().getPort());
        }
        List<DbScheduleSubTask> outOfWorkSubTaskList = DistributedScheduleService.getInstance().getOutOfWorkSubTask(queryOutOfWorkSubTaskCondition);
        if (outOfWorkSubTaskList == null || outOfWorkSubTaskList.size() == 0) {
            return 0;
        }

        // 遍历任务，如果状态是'处理中'则修复为'阻塞待处理'，如果状态是'失败重试-处理中'则修复为'失败重试-阻塞待处理'
        List<DbScheduleSubTask> updateSubTaskList = new ArrayList<>();
        for (DbScheduleSubTask outOfWorkSubTask : outOfWorkSubTaskList) {
            DbScheduleSubTask updateSubTask = outOfWorkSubTask.createUpdateSubTask();
            updateSubTask.setStatus(TaskStatus.forCode(outOfWorkSubTask.getStatus()) == TaskStatus.RUNNING ? TaskStatus.BLOCKED.getCode() : TaskStatus.RETRY_BLOCKED.getCode());
            updateSubTaskList.add(updateSubTask);
        }

        // 批量修改数据
        int updateSubTaskCount = updateSubTaskList.size();
        if (updateSubTaskCount > 0) {
            DistributedScheduleService.getInstance().batchUpdateDbScheduleSubTaskById(updateSubTaskList);
        }
        return updateSubTaskCount;
    }

    /**
     * 描述：维护主任务设置的分区数量和子任务分区数量不相等的任务，使两个分区数量相等
     */
    private void maintainMismatchTask() {
        List<DbScheduleTask> mismatchTaskList = DistributedScheduleService.getInstance().getMismatchTask(getQueryMismatchTaskCondition());
        if (mismatchTaskList == null || mismatchTaskList.size() == 0) {
            return;
        }

        // 遍历并初始化主任务、子任务分区数量不匹配的任务
        for (DbScheduleTask mismatchTask : mismatchTaskList) {
            initDbScheduleTask(mismatchTask);
        }
    }

    /**
     * 描述：维护MQ监听器
     */
    private void maintainMQListener() {
        List<DbScheduleTask> needMQListenerTaskList = DistributedScheduleService.getInstance().getNeedMQListenerTask(getNeedMQListenerTaskCondition());
        if (needMQListenerTaskList == null || needMQListenerTaskList.size() == 0) {
            return;
        }

        // 遍历需要启动MQ监听服务的主任务，若当前服务器已经启动监听则跳过
        for (DbScheduleTask needMQListenerTask : needMQListenerTaskList) {
            String mqConfigId = needMQListenerTask.getMqConfigId();
            String mqCallback = needMQListenerTask.getMqCallback();
            String key = StringUtils.join(new Object[]{mqConfigId, mqCallback}, "-");
            MQListener mqListener = mqListenerMap.get(key);
            if (mqListener != null) {
                continue;
            }
            // 若当前服务器未启动该监听则根据任务类型创建相应的ActiveMQ或者RocketMQ
            MQ mq = MQ.forCode(needMQListenerTask.getMqType());
            String mqConnection = needMQListenerTask.getMqConnection();
            if (mq == MQ.ACTIVEMQ) {
                mqListener = MQListenerFactory.getActiveMQListener(mqConnection, needMQListenerTask.getMqUser(), needMQListenerTask.getMqPassword(), mqCallback);
            } else {
                mqListener = MQListenerFactory.getRocketMQListener(mqConnection, needMQListenerTask.getMqConsumerGroupName(), mqCallback);
            }
            if (mqListener != null) {
                try {
                    mqListener.start();
                    mqListenerMap.put(key, mqListener);
                } catch (DistributedScheduleMQException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

    /**
     * 描述：已到执行时间的任务以及失败后需要重试的任务的状态维护
     */
    private int maintainExecutableTask() {
        List<DbScheduleTask> executableTaskList = DistributedScheduleService.getInstance().getExecutableTask(getQueryExecutableTaskCondition());
        if (executableTaskList == null || executableTaskList.size() == 0) {
            return 0;
        }

        // 遍历获取的任务并分配到对应的方法中
        final List<Map<String, Object>> updateTaskList = new ArrayList<>();
        final List<DbScheduleSubTask> updateSubTaskList = new ArrayList<>();
        List<Future<Boolean>> futureList = new ArrayList<>();
        for (DbScheduleTask executableTask : executableTaskList) {
            if (executableTask.getId() == null) {
                lastScheduleDeadline = executableTask.getCurrentTime();
                continue;
            }
            TaskStatus taskStatus = TaskStatus.forCode(executableTask.getStatus());
            if (taskStatus == TaskStatus.ERROR && ErrorPolicy.forCode(executableTask.getErrorPolicy()) == ErrorPolicy.RETRY) {
                maintainFailedTask(executableTask, updateTaskList, updateSubTaskList);
            } else {
                // 如果有依赖任务的采用异步线程的方式处理
                String dependentTaskIds = executableTask.getDependentTaskId();
                if (StringUtils.isNotBlank(dependentTaskIds)) {
                    futureList.add(getAsynExecutorService().submit(new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {
                            maintainExecutableTask(executableTask, updateTaskList, updateSubTaskList);
                            return Boolean.TRUE;
                        }
                    }));
                } else {
                    maintainExecutableTask(executableTask, updateTaskList, updateSubTaskList);
                }
            }
        }

        // 异步处理有依赖任务的任务
        if (futureList.size() > 0) {
            for (Future<Boolean> future : futureList) {
                try {
                    future.get();
                } catch (Exception ignore) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("The thread pools of handle dependent task execute task fail!", ignore);
                    }
                }
            }
        }

        // 批量修改主任务及子任务状态
        int updateCount = updateTaskList.size();
        if (updateTaskList.size() > 0) {
            DistributedScheduleService.getInstance().batchUpdateDbScheduleTaskByIdAndVersion(updateTaskList);

            // 批量修改子任务状态
            if (updateSubTaskList.size() > 0) {
                DistributedScheduleService.getInstance().batchUpdateDbScheduleSubTaskById(updateSubTaskList);
            }

            // 如果有需要处理的任务则需要通知任务抓取器线程
            DistributedScheduleCatcher.getSingleInstance().signalIdle();

            // 记录任务日志信息
            AsynInsertTaskRecord.add(updateTaskList);
        }
        return updateCount;
    }

    /**
     * 描述：维护完成任务状态：
     *  当所有子任务状态均为完成时,修改任务状态："处理中" -> "处理完成";
     *  当有子任务状态为 "处理失败" 时,修改任务状态："处理中" -> "处理失败"，如果失败策略是重试的话则设置下次重试时间
     *
     * @return 本次修改的已完成任务数量
     */
    private int maintainFinishedTask() {
        List<DbScheduleTask> finishedTaskList = DistributedScheduleService.getInstance().getFinishedTask(getQueryFinishedTaskCondition());
        if (finishedTaskList == null || finishedTaskList.size() == 0) {
            return 0;
        }

        // 遍历所有任务
        List<Map<String, Object>> updateList = new ArrayList<>();
        for (DbScheduleTask finishedTask : finishedTaskList) {
            // 封装修改参数
            Map<String, Object> updateMap = new HashMap<>(4);
            DbScheduleTask updateTask = finishedTask.createUpdateTask();
            updateMap.put("dto", updateTask);

            // 得到该任务下的所有子任务，如果子任务不存在则修改主任务状态为"未初始化"
            List<DbScheduleSubTask> subTaskList = finishedTask.getSubTaskList();
            if (subTaskList == null || subTaskList.size() == 0) {
                updateTask.setStatus(TaskStatus.NOT_INIT.getCode());
                needInitMismatchTask();
                updateMap.put("init", "true");
            } else {
                // 遍历所有子任务并分析其状态是异常还是完成
                boolean finish = true, failed = false;
                for (DbScheduleSubTask subTask : subTaskList) {
                    TaskStatus subTaskStatus = TaskStatus.forCode(subTask.getStatus());
                    if (TaskStatus.ERROR != subTaskStatus && TaskStatus.COMPLETE != subTaskStatus) {
                        finish = false;
                        break;
                    } else if (TaskStatus.ERROR == subTaskStatus) {
                        failed = true;
                    }
                }

                // 如果子任务全部执行完成则判断是否有执行失败的子任务，若有则任务为失败否则为成功
                if (finish) {
                    if (!failed) {
                        updateTask.setStatus(TaskStatus.COMPLETE.getCode());
                        // 任务成功把重试次数置为0
                        updateTask.setRetryIndex(0);
                    } else {
                        updateTask.setStatus(TaskStatus.ERROR.getCode());
                        // 如果失败策略是重试的话则设置下次重试时间
                        if (ErrorPolicy.forCode(finishedTask.getErrorPolicy()) == ErrorPolicy.RETRY) {
                            // 获取重试总次数和已重试次数
                            int retryCount = finishedTask.getRetryCount();
                            int retryIndex = finishedTask.getRetryIndex();
                            // 最小重试次数是1次，如果数据异常修正为默认次数1次
                            if (retryCount <= 0) {
                                retryCount = 1;
                                updateTask.setRetryCount(retryCount);
                            }
                            // 如果重试次数小于0则修正为默认次数0次
                            if (retryIndex < 0) {
                                retryIndex = 0;
                                updateTask.setRetryIndex(retryIndex);
                            }
                            // 若已达最高重试次数则不再重试
                            if (retryIndex < retryCount) {
                                // 获取重试间隔，如果数据错误默认1分钟
                                Long retryIntervalMilli = TimeUtil.parseToMilliseconds(finishedTask.getRetryInterval());
                                if (retryIntervalMilli == null || retryIntervalMilli < 0) {
                                    retryIntervalMilli = 60000L;
                                    updateTask.setRetryInterval("1m");
                                }
                                // 重试策略并根据重试策略获取具体重试时间
                                RetryPolicy retryPolicy = RetryPolicy.forCode(finishedTask.getRetryPolicy());
                                if (retryPolicy == RetryPolicy.NONE) {
                                    updateMap.put("nextFireIncrementTime", retryIntervalMilli);
                                } else if (retryPolicy == RetryPolicy.GRADUAL) {
                                    updateMap.put("nextFireIncrementTime", retryIntervalMilli * (retryIndex + 1));
                                } else {
                                    updateMap.put("nextFireIncrementTime", (long) (retryIntervalMilli * Math.pow(2, retryIndex)));
                                }
                            }
                        }
                    }
                    updateMap.put("finish", "true");
                }
            }

            // 如果需要修改主任务状态则修改数据库数据
            if (updateTask.getStatus() != null) {
                updateList.add(updateMap);
            }
        }

        // 批量更新任务状态
        int updateCount = updateList.size();
        if (updateCount > 0) {
            DistributedScheduleService.getInstance().batchUpdateDbScheduleTaskByIdAndVersion(updateList);

            // 异步处理日志信息
            getAsynExecutorService().execute(new Runnable() {
                @Override
                public void run() {
                    DistributedScheduleService.getInstance().updateDbScheduleTaskRecord();
                }
            });
        }
        return updateCount;
    }

    /**
     * 描述：维护已到执行时间的任务状态(时间以数据库时间为准)
     * 1.检索可执行任务，条件为：nextFireTime <= currentTime && （status = "处理完成" || "待命" || "处理异常但异常策略为忽略"）
     * 2.检查依赖任务
     * 3.修改任务状态 "处理完成" -> "阻塞待处理" , currentFireTime = null, currentFinishTime = null, nextFireTime = cronExpression.getTimeAfter(currentTime)
     * 4.修改子任务状态 "处理完成" -> "阻塞待处理"
     */
    private void maintainExecutableTask(final DbScheduleTask executableTask, final List<Map<String, Object>> updateTaskList, final List<DbScheduleSubTask> updateSubTaskList) {

        // 封装修改参数
        Map<String, Object> updateTaskMap = new HashMap<>(4);
        DbScheduleTask updateTask = executableTask.createUpdateTask();
        updateTaskMap.put("dto", updateTask);

        // 得到该任务下的所有子任务，如果子任务不存在则修改主任务状态为"未初始化"
        List<DbScheduleSubTask> subTaskList = executableTask.getSubTaskList();
        if (subTaskList == null || subTaskList.size() == 0) {
            updateTask.setStatus(TaskStatus.NOT_INIT.getCode());
            updateTaskMap.put("init", "true");
            updateTaskList.add(updateTaskMap);
            needInitMismatchTask();
            return;
        }

        // 设置下次执行时间
        if (!setNextFireTime(executableTask, updateTask)) {
            updateTaskList.add(updateTaskMap);
            return;
        }

        /*
         * 检查是否有依赖任务，若有则判断所有依赖任务是否执行完毕:
         * 若依赖任务执行失败则查询器依赖任务的依赖失败策略，若依赖失败策略为跳过则可以继续执行
         */
        String dependentTaskIds = executableTask.getDependentTaskId();
        if (StringUtils.isNotBlank(dependentTaskIds)) {

            // 得到依赖任务的主任务ID，并根据主任务ID得到对应的任务信息
            List<Long> dependentTaskIdList = new ArrayList<>();
            String[] dependentTaskIdArray = dependentTaskIds.trim().split(",");
            for (String dependentTaskIdStr : dependentTaskIdArray) {
                dependentTaskIdStr = dependentTaskIdStr.trim();
                if (!StringUtils.isNumeric(dependentTaskIdStr)) {
                    updateTask.setStatus(TaskStatus.FATAL.getCode());
                    updateTask.setErrorMessage("This task dependent_task_id has wrong task id!");
                    updateTaskList.add(updateTaskMap);
                    return;
                }
                dependentTaskIdList.add(Long.parseLong(dependentTaskIdStr));
            }
            Map<String, Object> queryMap = new HashMap<>(8);
            queryMap.put("idList", dependentTaskIdList);
            List<DbScheduleTask> dependentTaskList = DistributedScheduleService.getInstance().getDependentTask(queryMap);

            /*
             * 1.如果没有查询到相应的依赖任务、查询到的依赖任务数量与查询条件的条数不符合，则把主任务状态改为"致命错误"并记录错误信息
             * 2.强依赖：如果依赖任务开关关闭则依赖不通过，如果依赖任务状态为"处理完成"并且下次执行时间大于当前时间则依赖通过，否则都为依赖不通过
             *   弱依赖：如果依赖任务开关关闭或者开关打开时依赖任务状态为"处理完成"、"处理异常"、"致命错误"并且下次执行时间大于当前时间，则依赖通过，否则都为依赖不通过
             */
            if (dependentTaskList == null || dependentTaskList.size() != dependentTaskIdArray.length) {
                updateTask.setStatus(TaskStatus.FATAL.getCode());
                updateTask.setErrorMessage("This task dependent_task_id has wrong task id!");
                updateTaskList.add(updateTaskMap);
                return;
            }
            boolean policyThrough = true;
            if (DependencePolicy.STRONG == DependencePolicy.forCode(executableTask.getDependencePolicy())) {
                for (DbScheduleTask dependentTask : dependentTaskList) {
                    if (Switch.forCode(dependentTask.getTaskSwitch()) == Switch.CLOSE) {
                        policyThrough = false;
                        break;
                    }
                    TaskStatus dependentTaskStatus = TaskStatus.forCode(dependentTask.getStatus());
                    if (TaskStatus.COMPLETE == dependentTaskStatus) {
                        if (dependentTask.getNextFireTime().compareTo(dependentTask.getCurrentTime()) <= 0) {
                            policyThrough = false;
                            break;
                        }
                    } else {
                        policyThrough = false;
                        break;
                    }
                }
            } else {
                for (DbScheduleTask dependentTask : dependentTaskList) {
                    if (Switch.forCode(dependentTask.getTaskSwitch()) == Switch.OPEN) {
                        TaskStatus dependentTaskStatus = TaskStatus.forCode(dependentTask.getStatus());
                        if (TaskStatus.COMPLETE == dependentTaskStatus || TaskStatus.ERROR == dependentTaskStatus
                                || TaskStatus.FATAL == dependentTaskStatus) {
                            if (dependentTask.getNextFireTime().compareTo(dependentTask.getCurrentTime()) <= 0) {
                                policyThrough = false;
                                break;
                            }
                        } else {
                            policyThrough = false;
                            break;
                        }
                    }
                }
            }

            if (!policyThrough) {
                return;
            }
        }

        // 更新任务状态为"阻塞待处理"状态，并设置一般批次号
        updateTask.setStatus(TaskStatus.BLOCKED.getCode());
        updateTask.setBatchNo(GenerateUtil.getTaskBatchNo("N"));
        updateTaskMap.put("prevBatchNo", executableTask.getBatchNo());
        updateTaskMap.put("block", "true");
        updateTaskList.add(updateTaskMap);

        // 循环遍历更新子任务状态为"阻塞待处理"状态
        for (DbScheduleSubTask subTask : subTaskList) {
            DbScheduleSubTask updateSubTask = subTask.createUpdateSubTask();
            updateSubTask.setStatus(TaskStatus.BLOCKED.getCode());
            updateSubTaskList.add(updateSubTask);
        }
    }

    /**
     * 描述：维护任务异常后需要重试的任务
     * 1.检索任务,条件为 任务状态："处理异常" && 异常策略： "重试"
     * 2.判断该任务是否需要失败重试，若需要重试则设置任务状态为"N",失败子任务状态为"N"
     */
    private void maintainFailedTask(final DbScheduleTask failedTask, final List<Map<String, Object>> updateTaskList, final List<DbScheduleSubTask> updateSubTaskList) {

        // 封装修改参数
        Map<String, Object> updateMap = new HashMap<>(4);
        DbScheduleTask updateTask = failedTask.createUpdateTask();
        updateMap.put("dto", updateTask);

        // 设置下次执行时间
        if (!setNextFireTime(failedTask, updateTask)) {
            updateTaskList.add(updateMap);
            return;
        }

        // 更新主任务的状态以及当前重试次数
        updateTask.setRetryIndex(failedTask.getRetryIndex() + 1);
        updateTask.setStatus(TaskStatus.RETRY_BLOCKED.getCode());
        updateTask.setBatchNo(GenerateUtil.getTaskBatchNo("R"));
        updateMap.put("prevBatchNo", failedTask.getBatchNo());
        updateMap.put("block", "true");
        updateTaskList.add(updateMap);

        // 循环遍历更新子任务状态为"失败重试-阻塞待处理"状态
        List<DbScheduleSubTask> subTaskList = failedTask.getSubTaskList();
        for (DbScheduleSubTask subTask : subTaskList) {
            if (TaskStatus.forCode(subTask.getStatus()) == TaskStatus.ERROR) {
                DbScheduleSubTask updateSubTask = subTask.createUpdateSubTask();
                updateSubTask.setStatus(TaskStatus.RETRY_BLOCKED.getCode());
                updateSubTaskList.add(updateSubTask);
            }
        }
    }

    /**
     * 描述：初始化主任务、子任务分区数量不匹配的任务
     */
    public void initDbScheduleTask(DbScheduleTask mismatchTask) {
        // 封装修改参数
        Map<String, Object> updateMap = new HashMap<>(4);
        DbScheduleTask updateTask = mismatchTask.createUpdateTask();
        updateMap.put("dto", updateTask);

        // 如果主任务分区数量小于1则修正为1再比较，并将主任务分区数量修改为1
        int partitionCount = mismatchTask.getPartitionCount();
        if (partitionCount < DistributedScheduleConstants.MIN_PARTITION_COUNT) {
            partitionCount = DistributedScheduleConstants.MIN_PARTITION_COUNT;
            updateTask.setPartitionCount(DistributedScheduleConstants.MIN_PARTITION_COUNT);
        } else if (partitionCount > DistributedScheduleConstants.MAX_PARTITION_COUNT) {
            partitionCount = DistributedScheduleConstants.MAX_PARTITION_COUNT;
            updateTask.setPartitionCount(DistributedScheduleConstants.MAX_PARTITION_COUNT);
        }

        // 判断主任务分区数量是否与子任务分区数量相等
        int subTaskCount = mismatchTask.getSubTaskList() != null ? mismatchTask.getSubTaskList().size() : 0;
        if (partitionCount == subTaskCount) {
            return;
        }

        // 修改任务状态为'正在初始化'状态
        updateTask.setStatus(TaskStatus.INITING.getCode());

        int updateCount = DistributedScheduleService.getInstance().updateDbScheduleTaskByIdAndVersion(updateMap);
        if (updateCount > 0) {
            // 如果存在旧的子任务则需要先删除旧的子任务
            if (subTaskCount > 0) {
                DistributedScheduleService.getInstance().deleteDbScheduleSubTaskByParentId(mismatchTask.getId());
            }
            // 根据分区数量获取等分区数量的子任务
            List<DbScheduleSubTask> subTaskList = new ArrayList<>();
            for (int index = 0; index < partitionCount; index++) {
                DbScheduleSubTask subTask = new DbScheduleSubTask();
                subTask.setId(Long.parseLong(mismatchTask.getId() + StringUtils.leftPad(String.valueOf(index), 3, '0')));
                subTask.setParentId(mismatchTask.getId());
                subTask.setPartitionCount(partitionCount);
                subTask.setPartitionIndex(index);
                subTask.setStatus(TaskStatus.STANDBY.getCode());
                subTaskList.add(subTask);
            }
            BatchResult batchResult = DistributedScheduleService.getInstance().batchInsertDbScheduleSubTask(subTaskList);
            updateTask.setStatus(TaskStatus.STANDBY.getCode());
            if (batchResult.getSuccessCount() != partitionCount) {
                updateTask.setStatus(TaskStatus.INIT_FAIL.getCode());
            }
            updateTask.setVersion(updateTask.getVersion() + 1);
            updateCount = DistributedScheduleService.getInstance().updateDbScheduleTaskByIdAndVersion(updateMap);
            if (updateCount > 0 && logger.isDebugEnabled()) {
                logger.debug("Thread[{}] task[{}] status change for '{}', task initialization success!", getThreadName(), mismatchTask.getTaskId(),
                        TaskStatus.forCode(mismatchTask.getStatus()), TaskStatus.forCode(updateTask.getStatus()));
            }
        }
    }

    /**
     * 描述：获取调度器线程空闲时间
     */
    private long getIdleTimeMillis() {
        DbScheduleTask nearestExecutableTask = DistributedScheduleService.getInstance().getNearestExecutableTask(getQueryNearestExecutableTaskCondition());
        if (nearestExecutableTask != null) {
            Timestamp nextFireTime = nearestExecutableTask.getNextFireTime();
            if (nextFireTime != null) {
                Timestamp currentTime = nearestExecutableTask.getCurrentTime();
                long idleTimeMillis = nextFireTime.getTime() - currentTime.getTime();
                return idleTimeMillis < 0 ? 0 : idleTimeMillis;
            }
        }
        return -1;
    }

    /**
     * 描述：设置下次任务执行时间
     */
    private boolean setNextFireTime(DbScheduleTask dbScheduleTask, DbScheduleTask updateTask) {
        // 判断是否是一次性任务,如果cronText为空则为一次性任务
        String cronExpression = dbScheduleTask.getCronExpression();
        if (StringUtils.isNotBlank(cronExpression)) {
            // 得到当前时间
            Date currentTime = new Date();
            Timestamp dbCurrentTime = dbScheduleTask.getCurrentTime();
            if (dbCurrentTime != null) {
                currentTime = new Date(dbCurrentTime.getTime());
            }

            // nextFireTime更新为系统当前时间之后计算出的最近的一个的时间，如果计算错误则把主任务状态改为"致命错误"
            Date nextFireDate = CronExpressionUtil.getNextFireDate(cronExpression, currentTime);
            if (nextFireDate == null) {
                updateTask.setStatus(TaskStatus.FATAL.getCode());
                updateTask.setErrorMessage("This task corn_text value is wrong!");
                return false;
            }
            updateTask.setNextFireTime(new Timestamp(nextFireDate.getTime()));
        }
        return true;
    }

    private Map<String, Object> queryMismatchTaskCondition;

    /**
     * 得到查询条件：状态为未初始化、待命、处理完成、处理异常但异常策略为忽略的状态的主任务分区数量与子任务分区数量不相等的任务
     */
    private Map<String, Object> getQueryMismatchTaskCondition() {
        if (queryMismatchTaskCondition == null) {
            queryMismatchTaskCondition = new HashMap<>(8);
            List<String> queryStatusList = new ArrayList<>();
            queryStatusList.add(TaskStatus.NOT_INIT.getCode());
            queryStatusList.add(TaskStatus.STANDBY.getCode());
            queryStatusList.add(TaskStatus.COMPLETE.getCode());
            queryMismatchTaskCondition.put("statusList", queryStatusList);
            queryMismatchTaskCondition.put("errorStatus", TaskStatus.ERROR.getCode());
            queryMismatchTaskCondition.put("errorPolicy_ignore", ErrorPolicy.IGNORE.getCode());
            queryMismatchTaskCondition.put("taskSwitchOpen", Switch.OPEN.getCode());
        }
        return queryMismatchTaskCondition;
    }

    private Map<String, Object> queryNeedMQListenerTaskCondition;

    /**
     * 得到查询条件：任务类型为ActiveMQ或者RocketMQ的任务
     */
    private Map<String, Object> getNeedMQListenerTaskCondition() {
        if (queryNeedMQListenerTaskCondition == null) {
            queryNeedMQListenerTaskCondition = new HashMap<>(4);
            queryNeedMQListenerTaskCondition.put("mqTaskType", TaskType.MQ.getCode());
            queryNeedMQListenerTaskCondition.put("taskSwitchOpen", Switch.OPEN.getCode());
        }
        return queryNeedMQListenerTaskCondition;
    }

    private Map<String, Object> queryExecutableTaskCondition;

    /**
     * 得到查询条件
     */
    private Map<String, Object> getQueryExecutableTaskCondition() {
        if (queryExecutableTaskCondition == null) {
            queryExecutableTaskCondition = new HashMap<>(16);
            List<String> queryStatusList = new ArrayList<>();
            queryStatusList.add(TaskStatus.STANDBY.getCode());
            queryStatusList.add(TaskStatus.COMPLETE.getCode());
            queryExecutableTaskCondition.put("statusList", queryStatusList);
            queryExecutableTaskCondition.put("errorStatus", TaskStatus.ERROR.getCode());
            List<String> errorPolicyList = new ArrayList<>();
            errorPolicyList.add(ErrorPolicy.IGNORE.getCode());
            errorPolicyList.add(ErrorPolicy.RETRY.getCode());
            queryExecutableTaskCondition.put("errorPolicyList", errorPolicyList);
            List<String> runStatusList = new ArrayList<>();
            runStatusList.add(TaskStatus.BLOCKED.getCode());
            runStatusList.add(TaskStatus.RETRY_BLOCKED.getCode());
            runStatusList.add(TaskStatus.RUNNING.getCode());
            runStatusList.add(TaskStatus.RETRY_RUNNING.getCode());
            queryExecutableTaskCondition.put("runStatusList", runStatusList);
            queryExecutableTaskCondition.put("blockPolicy_idempotent", BlockPolicy.IDEMPOTENT.getCode());
            queryExecutableTaskCondition.put("taskSwitchOpen", Switch.OPEN.getCode());
        }
        return queryExecutableTaskCondition;
    }

    private Map<String, Object> queryFinishedTaskCondition;

    /**
     * 得到查询条件：查询所有主任务状态为"处理中"或"异常重试-处理中"、所有子任务状态为"处理异常"或"处理完成"的任务
     */
    private Map<String, Object> getQueryFinishedTaskCondition() {
        if (queryFinishedTaskCondition == null) {
            queryFinishedTaskCondition = new HashMap<>(4);
            List<String> runningStatusList = new ArrayList<>();
            runningStatusList.add(TaskStatus.RUNNING.getCode());
            runningStatusList.add(TaskStatus.RETRY_RUNNING.getCode());
            queryFinishedTaskCondition.put("runningStatusList", runningStatusList);
            List<String> finishStatusList = new ArrayList<>();
            finishStatusList.add(TaskStatus.ERROR.getCode());
            finishStatusList.add(TaskStatus.COMPLETE.getCode());
            queryFinishedTaskCondition.put("finishStatusList", finishStatusList);
            queryFinishedTaskCondition.put("taskSwitchOpen", Switch.OPEN.getCode());
        }
        return queryFinishedTaskCondition;
    }

    /**
     * 上一次调度器的判断时间
     */
    private Timestamp lastScheduleDeadline;

    /**
     * 得到查询条件：得到距离当前时间最近的可执行任务的下次执行时间
     */
    private Map<String, Object> getQueryNearestExecutableTaskCondition() {
        Map<String, Object> queryNearestExecutableTaskCondition = getQueryExecutableTaskCondition();
        queryNearestExecutableTaskCondition.put("lastScheduleDeadline", lastScheduleDeadline);
        return queryNearestExecutableTaskCondition;
    }

    /**
     * 异步处理任务的线程池
     */
    private ExecutorService asynExecutorService;

    /**
     * 异步处理任务的线程池：用于处理依赖任务或者处理日志信息
     */
    private ExecutorService getAsynExecutorService() {
        if (asynExecutorService == null) {
            ThreadFactory namedThreadFactory = new DefaultThreadFactory().setNameFormat(getThreadName() + ":dependent").build();
            asynExecutorService = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return asynExecutorService;
    }

    /**
     * 当在页面创建主任务时需要初始化不匹配任务
     */
    public void needInitMismatchTask() {
        this.needInitMismatchTask.set(true);
        signalIdle();
    }

    private static DistributedScheduler singleInstance;

    static DistributedScheduler getSingleInstance() {
        if (singleInstance == null) {
            synchronized (DistributedScheduler.class) {
                if (singleInstance == null) {
                    singleInstance = new DistributedScheduler();
                }
            }
        }
        return singleInstance;
    }

    private DistributedScheduler() {
        super();
        this.defaultIdleTimeMillis = DistributedScheduleConstants.DEFAULT_IDLE_TIME_MILLI;
    }
}